{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Tce3stUlHN0L"
      },
      "source": [
        "##### Copyright 2019 The TensorFlow Authors.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "tuOe1ymfHZPu"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MfBg1C5NB3X0"
      },
      "source": [
        "# Estimator を使ったマルチワーカートレーニング\n",
        "\n",
        "<table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "  <td>\n",
        "<img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\"><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/ja/tutorials/distribute/multi_worker_with_estimator.ipynb\">TensorFlow.org で表示</a>\n",
        "</td>\n",
        "  <td>\n",
        "<img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\"><a target=\"_blank\" href=\"https://github.com/tensorflow/docs-l10n/blob/master/site/ja/tutorials/distribute/multi_worker_with_estimator.ipynb\">Google Colab で実行</a>\n",
        "</td>\n",
        "  <td>\n",
        "<img src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\"><a target=\"_blank\" href=\"https://storage.googleapis.com/tensorflow_docs/docs-l10n/site/ja/tutorials/distribute/multi_worker_with_estimator.ipynb\">GitHub でソースを表示</a>\n",
        "</td>\n",
        "  <td>\n",
        "<img src=\"https://www.tensorflow.org/images/download_logo_32px.png\"><a href=\"https://storage.googleapis.com/tensorflow_docs/docs/site/en/tutorials/distribute/multi_worker_with_estimator.ipynb\">ノートブックをダウンロード</a>\n",
        "</td>\n",
        "</table>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "xHxb-dlhMIzW"
      },
      "source": [
        "## 概要\n",
        "\n",
        "注意: `tf.distribute` API で Estimator を使用する際、`tf.distribute` で Keras を使用することを推奨します。[Keras を使ったマルチワーカートレーニング](multi_worker_with_keras.ipynb)をご覧ください。`tf.distribute.Strategy` を使った Estimator トレーニングのサポートは制限されています。\n",
        "\n",
        "このチュートリアルでは、`tf.estimator` を使った分散型マルチワーカートレーニングに `tf.distribute.Strategy` を使用する方法を実演しています。`tf.estimator` を使って独自のコードを記述しており、高性能の単一の機械を超えるスケーリングに関心がある場合は、このチュートリアルをご利用ください。\n",
        "\n",
        "始める前に、[分散ストラテジー](../../guide/distributed_training.ipynb)ガイドをお読みください。[マルチ GPU トレーニングのチュートリアル](./keras.ipynb)も関連しています。このチュートリアルでは同じモデルが使用されています。\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MUXex9ctTuDB"
      },
      "source": [
        "## セットアップ\n",
        "\n",
        "最初に、TensorFlow と必要なインポートをセットアップします。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "bnYxvfLD-LW-"
      },
      "outputs": [],
      "source": [
        "import tensorflow_datasets as tfds\n",
        "import tensorflow as tf\n",
        "tfds.disable_progress_bar()\n",
        "\n",
        "import os, json"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "hPBuZUNSZmrQ"
      },
      "source": [
        "## 入力関数\n",
        "\n",
        "このチュートリアルでは、[TensorFlow Datasets](https://www.tensorflow.org/datasets) の MNIST データセットを使用しています。このコードは[マルチ GPU トレーニングのチュートリアル](./keras.ipynb)のコードに似ていますが、大きな違いが 1 つあります。マルチワーカートレーニングに Estimator を使用する際は、モデルのコンバージェンスを可能にできるよう、ワーカーの数でデータセットをシャーディングする必要があります。入力データは、ワーカーインデックスでシャーディングされるため、各ワーカーは、データセットの各 `1/num_workers` の部分を処理します。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "dma_wUAxZqo2"
      },
      "outputs": [],
      "source": [
        "BUFFER_SIZE = 10000\n",
        "BATCH_SIZE = 64\n",
        "\n",
        "def input_fn(mode, input_context=None):\n",
        "  datasets, info = tfds.load(name='mnist',\n",
        "                                with_info=True,\n",
        "                                as_supervised=True)\n",
        "  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else\n",
        "                   datasets['test'])\n",
        "\n",
        "  def scale(image, label):\n",
        "    image = tf.cast(image, tf.float32)\n",
        "    image /= 255\n",
        "    return image, label\n",
        "\n",
        "  if input_context:\n",
        "    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,\n",
        "                                        input_context.input_pipeline_id)\n",
        "  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4BlcVXMhB59T"
      },
      "source": [
        "コンバージェンスを達成するためのもう 1 つの合理的なアプローチとして、各ワーカーで異なるシードを使ってデータベースをシャッフルする方法があります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8YFpxrcsZ2xG"
      },
      "source": [
        "## マルチワーカー構成\n",
        "\n",
        "このチュートリアルの主な違いの 1 つに（[マルチ GPU トレーニングのチュートリアル](./keras.ipynb) と比較）、マルチワーカーのセットアップがあります。`TF_CONFIG` 環境変数は、クラスタの一部である各ワーカーにクラスタ構成を指定する標準的な方法です。\n",
        "\n",
        "`TF_CONFIG` には、`cluster` と `task` の 2 つのコンポーネントがあります。`cluster` は、クラスタのワーカーとパラメータサーバーを含むクラスタ全体に関する情報を提供するのに対し、`task` は、現在のタスクに関する情報を提供します。最初のコンポーネント `cluster` は、クラスタ内のすべてのワーカーとパラメータサーバーで同一であり、2 つ目のコンポーネント `task` は、各ワーカーとパラメータサーバー間で異なり、それぞれに `type` と `index` を指定します。この例では、タスクの `type` は `worker` で、タスクの `index` は `0` です。\n",
        "\n",
        "説明の目的により、このチュートリアルでは、`localhost` 上に 2 つのワーカーを持つ `TF_CONFIG` の設定方法を示しています。実践として、外部 IP アドレスとポートに複数のワーカーを作成し、各ワーカーに適切に `TF_CONFIG` を設定します（タスクの `index` を変更します）。\n",
        "\n",
        "警告: *次のコードを Colab で実行しないでください。*  TensorFlow のランタイムは、指定された IP アドレスとポートに gRPC サーバーを作成しようとしますが、失敗する可能性があります。\n",
        "\n",
        "```\n",
        "os.environ['TF_CONFIG'] = json.dumps({     'cluster': {         'worker': [\"localhost:12345\", \"localhost:23456\"]     },     'task': {'type': 'worker', 'index': 0} })\n",
        "```\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "qDreJzTffAP5"
      },
      "source": [
        "## モデルを定義する\n",
        "\n",
        "トレーニング用にレイヤー、オプティマイザ、および損失関数を記述します。このチュートリアルでは、[マルチ GPU トレーニングのチュートリアル](./keras.ipynb)と同様に、Keras レイヤーを使ったモデルを定義しています。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "WNvOn_OeiUYC"
      },
      "outputs": [],
      "source": [
        "LEARNING_RATE = 1e-4\n",
        "def model_fn(features, labels, mode):\n",
        "  model = tf.keras.Sequential([\n",
        "      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),\n",
        "      tf.keras.layers.MaxPooling2D(),\n",
        "      tf.keras.layers.Flatten(),\n",
        "      tf.keras.layers.Dense(64, activation='relu'),\n",
        "      tf.keras.layers.Dense(10)\n",
        "  ])\n",
        "  logits = model(features, training=False)\n",
        "\n",
        "  if mode == tf.estimator.ModeKeys.PREDICT:\n",
        "    predictions = {'logits': logits}\n",
        "    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)\n",
        "\n",
        "  optimizer = tf.compat.v1.train.GradientDescentOptimizer(\n",
        "      learning_rate=LEARNING_RATE)\n",
        "  loss = tf.keras.losses.SparseCategoricalCrossentropy(\n",
        "      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)\n",
        "  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)\n",
        "  if mode == tf.estimator.ModeKeys.EVAL:\n",
        "    return tf.estimator.EstimatorSpec(mode, loss=loss)\n",
        "\n",
        "  return tf.estimator.EstimatorSpec(\n",
        "      mode=mode,\n",
        "      loss=loss,\n",
        "      train_op=optimizer.minimize(\n",
        "          loss, tf.compat.v1.train.get_or_create_global_step()))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "P94PrIW_kSCE"
      },
      "source": [
        "注意: この例の学習速度は固定されていますが、一般的に、グローバルバッチサイズに基づいて学習速度を調整する必要があります。"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UhNtHfuxCGVy"
      },
      "source": [
        "## MultiWorkerMirroredStrategy\n",
        "\n",
        "モデルをトレーニングするために、 `tf.distribute.experimental.MultiWorkerMirroredStrategy` のインスタンスを作成します。 `MultiWorkerMirroredStrategy` は、すべてのワーカーの各装置にあるモデルのレイヤーにすべての変数のコピーを作成します。集合通信に使用する TensorFlow 演算子 `CollectiveOps` を使用して勾配を集め、変数の同期を維持します。このストラテジーの詳細は、[`tf.distribute.Strategy` ガイド](../../guide/distributed_training.ipynb)で説明されています。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "1uFSHCJXMrQ-"
      },
      "outputs": [],
      "source": [
        "strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "H47DDcOgfzm7"
      },
      "source": [
        "## モデルをトレーニングして評価する\n",
        "\n",
        "次に、分散ストラテジーを Estimator の `RunConfig` に指定し、`tf.estimator.train_and_evaluate` を呼び出してトレーニングと評価を行います。このチュートリアルでは、`train_distribute` 経由でストラテジーを指定してトレーニングのみを分散しています。`eval_distribute` を使って評価を分散することもできます。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "BcsuBYrpgnlS"
      },
      "outputs": [],
      "source": [
        "config = tf.estimator.RunConfig(train_distribute=strategy)\n",
        "\n",
        "classifier = tf.estimator.Estimator(\n",
        "    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)\n",
        "tf.estimator.train_and_evaluate(\n",
        "    classifier,\n",
        "    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),\n",
        "    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "XVk4ftYx6JAO"
      },
      "source": [
        "## トレーニングのパフォーマンスを最適化する\n",
        "\n",
        "`tf.distribute.Strategy` により、モデルとマルチワーカー対応 Estimator の準備が整いました。次のテクニックに従って、マルチワーカートレーニングのパフォーマンスを最適化することができます。\n",
        "\n",
        "- *バッチサイズの増加:* ここで指定されるバッチサイズは、GPU 単位のサイズです。一般的に、GPU メモリに収まる最大バッチサイズの指定が推奨されます。\n",
        "\n",
        "- *変数のキャスト:* 可能であれば、`tf.float` に変数をキャストしてください。公式の ResNet モデルには、どのようにしてこれを行うかの[例](https://github.com/tensorflow/models/blob/8367cf6dabe11adf7628541706b660821f397dce/official/resnet/resnet_model.py#L466)が示されています。\n",
        "\n",
        "- *集合通信の使用:* `MultiWorkerMirroredStrategy` は、複数の[集合通信実装](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/distribute/cross_device_ops.py)を提供しています。\n",
        "\n",
        "    - `RING` は、クロスホスト通信レイヤーとして、gRPC を使用したリング状の集合体を実装します。\n",
        "    - `NCCL` は、[Nvidia の NCCL](https://developer.nvidia.com/nccl) を使用して集合体を実装します。\n",
        "    - `AUTO` は、選択をランタイムに持ち越します。\n",
        "\n",
        "    最適な集合体実装の選択肢は、GPU 数と種類によって異なり、ネットワークはクラスタ内で相互接続します。自動選択をオーバーライドするには、`MultiWorkerMirroredStrategy` コンストラクタの `communication` パラメータに、 `communication=tf.distribute.experimental.CollectiveCommunication.NCCL` のように有効な値を指定します。\n",
        "\n",
        "ガイドの[パフォーマンスのセクション](../../guide/function.ipynb)に目を通し、独自の TensorFlow モデルのパフォーマンス最適化に使用できるほかのストラテジーや[ツール](../../guide/profiler.md)についてさらに詳しく学習しましょう。\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "AW0Hb2xM6EGX"
      },
      "source": [
        "## その他のコード例\n",
        "\n",
        "1. Kubernetes テンプレートを使った tensorflow/ecosystem でマルチワーカートレーニングを行うための[エンドツーエンドの例](https://github.com/tensorflow/ecosystem/tree/master/distribution_strategy)。この例は最初に Keras モデルを使用し、それを `tf.keras.estimator.model_to_estimator` API を使って Estimator に変換します。\n",
        "2. [公式モデル](https://github.com/tensorflow/models/tree/master/official)。この多くは、複数の分散ストラテジーで実行するように構成できます。\n"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "Tce3stUlHN0L"
      ],
      "name": "multi_worker_with_estimator.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
